{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import sys\n",
    "# 如果当前代码文件运行测试需要加入修改路径，避免出现后导包问题\n",
    "BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))\n",
    "sys.path.insert(0, os.path.join(BASE_DIR))\n",
    "\n",
    "PYSPARK_PYTHON = \"/miniconda2/envs/reco_sys/bin/python\"\n",
    "# 当存在多个版本时，不指定很可能会导致出错\n",
    "os.environ[\"PYSPARK_PYTHON\"] = PYSPARK_PYTHON\n",
    "os.environ[\"PYSPARK_DRIVER_PYTHON\"] = PYSPARK_PYTHON\n",
    "\n",
    "from offline import SparkSessionBase\n",
    "\n",
    "class UpdateRecall(SparkSessionBase):\n",
    "\n",
    "    SPARK_APP_NAME = \"updateRecall\"\n",
    "    ENABLE_HIVE_SUPPORT = True\n",
    "\n",
    "    def __init__(self):\n",
    "        self.spark = self._create_spark_session()\n",
    "\n",
    "ur = UpdateRecall()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+-------------------+-------+\n",
      "|            user_id|         article_id|clicked|\n",
      "+-------------------+-------------------+-------+\n",
      "|1105045287866466304|              14225|  false|\n",
      "|1106476833370537984|              14208|  false|\n",
      "|1109980466942836736|              19233|  false|\n",
      "|1109980466942836736|              44737|  false|\n",
      "|1109993249109442560|              17283|  false|\n",
      "|1111189494544990208|              19322|  false|\n",
      "|1111524501104885760|              44161|  false|\n",
      "|1112727762809913344|              18172|   true|\n",
      "|1113020831425888256|1112592065390182400|  false|\n",
      "|1114863735962337280|              17665|  false|\n",
      "|1114863741448486912|              14208|  false|\n",
      "|1114863751909081088|              13751|  false|\n",
      "|1114863846486441984|              17940|  false|\n",
      "|1114863941936218112|              15196|  false|\n",
      "|1114863998437687296|              19233|  false|\n",
      "|1114864164158832640|             141431|  false|\n",
      "|1114864237131333632|              13797|  false|\n",
      "|1114864354622177280|             134812|  false|\n",
      "|1115089292662669312|1112608068731928576|  false|\n",
      "|1115534909935452160|              18156|  false|\n",
      "+-------------------+-------------------+-------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# - 1、数据类型转换,clicked以及用户ID与文章ID处理\n",
    "ur.spark.sql('use profile')\n",
    "user_article_basic = ur.spark.sql(\"select user_id, article_id, clicked from user_article_basic\")\n",
    "user_article_basic.show()\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "def convert_boolean_int(row):\n",
    "    return row.user_id, row.article_id, int(row.clicked)\n",
    "    \n",
    "    \n",
    "user_article_basic = user_article_basic.rdd.map(convert_boolean_int).toDF(['user_id', 'article_id', 'clicked'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+-------------------+-------+\n",
      "|            user_id|         article_id|clicked|\n",
      "+-------------------+-------------------+-------+\n",
      "|1105045287866466304|              14225|      0|\n",
      "|1106476833370537984|              14208|      0|\n",
      "|1109980466942836736|              19233|      0|\n",
      "|1109980466942836736|              44737|      0|\n",
      "|1109993249109442560|              17283|      0|\n",
      "|1111189494544990208|              19322|      0|\n",
      "|1111524501104885760|              44161|      0|\n",
      "|1112727762809913344|              18172|      1|\n",
      "|1113020831425888256|1112592065390182400|      0|\n",
      "|1114863735962337280|              17665|      0|\n",
      "|1114863741448486912|              14208|      0|\n",
      "|1114863751909081088|              13751|      0|\n",
      "|1114863846486441984|              17940|      0|\n",
      "|1114863941936218112|              15196|      0|\n",
      "|1114863998437687296|              19233|      0|\n",
      "|1114864164158832640|             141431|      0|\n",
      "|1114864237131333632|              13797|      0|\n",
      "|1114864354622177280|             134812|      0|\n",
      "|1115089292662669312|1112608068731928576|      0|\n",
      "|1115534909935452160|              18156|      0|\n",
      "+-------------------+-------------------+-------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "user_article_basic.show()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 对用户ID和文章ID进行，索引建立，\n",
    "from pyspark.ml.feature import StringIndexer\n",
    "from pyspark.ml import Pipeline\n",
    "\n",
    "user_indexer = StringIndexer(inputCol='user_id', outputCol='als_user_id')\n",
    "article_indexer = StringIndexer(inputCol='article_id', outputCol='als_article_id')\n",
    "pip = Pipeline(stages=[user_indexer, article_indexer])\n",
    "pip_model = pip.fit(user_article_basic)\n",
    "als_user_article = pip_model.transform(user_article_basic)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+-------------------+-------+-----------+--------------+\n",
      "|            user_id|         article_id|clicked|als_user_id|als_article_id|\n",
      "+-------------------+-------------------+-------+-----------+--------------+\n",
      "|1105045287866466304|              14225|      0|       28.0|          13.0|\n",
      "|1106476833370537984|              14208|      0|       14.0|           1.0|\n",
      "|1109980466942836736|              19233|      0|       60.0|          19.0|\n",
      "|1109980466942836736|              44737|      0|       60.0|          17.0|\n",
      "|1109993249109442560|              17283|      0|       48.0|           7.0|\n",
      "|1111189494544990208|              19322|      0|        7.0|         140.0|\n",
      "|1111524501104885760|              44161|      0|       49.0|          11.0|\n",
      "|1112727762809913344|              18172|      1|       45.0|          55.0|\n",
      "|1113020831425888256|1112592065390182400|      0|       71.0|          29.0|\n",
      "|1114863735962337280|              17665|      0|        9.0|           5.0|\n",
      "|1114863741448486912|              14208|      0|       62.0|           1.0|\n",
      "|1114863751909081088|              13751|      0|       37.0|          37.0|\n",
      "|1114863846486441984|              17940|      0|        2.0|          73.0|\n",
      "|1114863941936218112|              15196|      0|       20.0|          33.0|\n",
      "|1114863998437687296|              19233|      0|       24.0|          19.0|\n",
      "|1114864164158832640|             141431|      0|       18.0|          51.0|\n",
      "|1114864237131333632|              13797|      0|        4.0|          50.0|\n",
      "|1114864354622177280|             134812|      0|       12.0|          12.0|\n",
      "|1115089292662669312|1112608068731928576|      0|       13.0|          24.0|\n",
      "|1115534909935452160|              18156|      0|       42.0|          53.0|\n",
      "+-------------------+-------------------+-------+-----------+--------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "als_user_article.show()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "#- 2、ALS模型训练以及推荐\n",
    "from pyspark.ml.recommendation import ALS\n",
    "als = ALS(userCol='als_user_id', itemCol='als_article_id', ratingCol='clicked')\n",
    "als_model = als.fit(als_user_article)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------+--------------------+\n",
      "|als_user_id|     recommendations|\n",
      "+-----------+--------------------+\n",
      "|         31|[[255,0.17741075]...|\n",
      "|         65|[[0,0.0], [10,0.0...|\n",
      "|         53|[[0,0.0], [10,0.0...|\n",
      "|         34|[[255,0.06449647]...|\n",
      "|         28|[[255,0.010355139...|\n",
      "|         26|[[255,0.094471075...|\n",
      "|         27|[[255,0.1838476],...|\n",
      "|         44|[[255,0.19231558]...|\n",
      "|         12|[[255,0.42547882]...|\n",
      "|         22|[[255,0.14787017]...|\n",
      "|         47|[[255,0.18815793]...|\n",
      "|          1|[[255,0.22982743]...|\n",
      "|         52|[[255,0.24817318]...|\n",
      "|         13|[[255,0.1511052],...|\n",
      "|          6|[[255,0.262215], ...|\n",
      "|         16|[[255,0.2500384],...|\n",
      "|          3|[[207,0.91574323]...|\n",
      "|         20|[[255,0.21161154]...|\n",
      "|         40|[[255,0.06386929]...|\n",
      "|         57|[[255,0.18380986]...|\n",
      "+-----------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "recall_res = als_model.recommendForAllUsers(100)\n",
    "recall_res.show()\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+-----------+\n",
      "|            user_id|als_user_id|\n",
      "+-------------------+-----------+\n",
      "|1106473203766657024|       26.0|\n",
      "|1113049054452908032|       44.0|\n",
      "|1114863751909081088|       37.0|\n",
      "|1115534909935452160|       42.0|\n",
      "|1113100263847100416|       54.0|\n",
      "|1103195673450250240|        5.0|\n",
      "|1105045287866466304|       28.0|\n",
      "|1114864237131333632|        4.0|\n",
      "|1111524501104885760|       49.0|\n",
      "|1109995264376045568|       19.0|\n",
      "|1105105185656537088|       46.0|\n",
      "|1110071654421102592|       64.0|\n",
      "|1114863965080387584|       65.0|\n",
      "|1114864128259784704|       17.0|\n",
      "|1114864233264185344|       40.0|\n",
      "|1115436666438287360|       29.0|\n",
      "|1114863846486441984|        2.0|\n",
      "|1115089292662669312|       13.0|\n",
      "|1113316420155867136|       72.0|\n",
      "|1114863902073552896|       16.0|\n",
      "+-------------------+-----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# - 3、推荐结果解析处理\n",
    "# 建立用户真实ID与用户索引的映射，文章真实ID与文章索引的映射\n",
    "user_real_index = als_user_article.groupBy(['user_id']).max('als_user_id').withColumnRenamed('max(als_user_id)', 'als_user_id')\n",
    "\n",
    "# user_real_index.show()\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "article_real_index = als_user_article.groupBy(['article_id']).max('als_article_id').withColumnRenamed('max(als_article_id)', 'als_article_id')\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "recall_res = recall_res.join(user_real_index, on=['als_user_id'], how='left').select(['als_user_id', 'recommendations', 'user_id'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------+--------------------+-------------------+\n",
      "|als_user_id|     recommendations|            user_id|\n",
      "+-----------+--------------------+-------------------+\n",
      "|          8|[[263,0.3481275],...|1109976363453906944|\n",
      "|         67|[[255,0.4729983],...|1114096769035141120|\n",
      "|         70|[[255,0.38408458]...|1115534898262704128|\n",
      "|          0|[[255,0.58535063]...|1106396183141548032|\n",
      "|         69|[[0,0.0], [10,0.0...|1114094806092480512|\n",
      "|          7|[[255,0.18091725]...|1111189494544990208|\n",
      "|         49|[[0,0.0], [10,0.0...|1111524501104885760|\n",
      "|         29|[[255,0.10471068]...|1115436666438287360|\n",
      "|         64|[[255,0.09094194]...|1110071654421102592|\n",
      "|         47|[[255,0.18815793]...|1112995431274512384|\n",
      "|         42|[[255,0.24995728]...|1115534909935452160|\n",
      "|         44|[[255,0.19231558]...|1113049054452908032|\n",
      "|         35|[[207,0.6278385],...|                  4|\n",
      "|         62|[[255,0.10057049]...|1114863741448486912|\n",
      "|         18|[[255,0.20005049]...|1114864164158832640|\n",
      "|          1|[[255,0.22982743]...|1114864874141253632|\n",
      "|         39|[[255,0.22850463]...|1115534631668547584|\n",
      "|         34|[[255,0.06449647]...|1108264901190615040|\n",
      "|         37|[[255,0.52489614]...|1114863751909081088|\n",
      "|         25|[[255,0.37197587]...|1114865014205841408|\n",
      "+-----------+--------------------+-------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "recall_res.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 对于文章推荐的解析\n",
    "import pyspark.sql.functions as F\n",
    "\n",
    "recall_res = recall_res.withColumn('als_article_id', F.explode('recommendations')).drop('recommendations').select(['user_id', 'als_article_id'])\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+-----------------+\n",
      "|            user_id|   als_article_id|\n",
      "+-------------------+-----------------+\n",
      "|1109976363453906944|  [263,0.3481275]|\n",
      "|1109976363453906944| [181,0.20810685]|\n",
      "|1109976363453906944| [255,0.20628238]|\n",
      "|1109976363453906944| [307,0.20628238]|\n",
      "|1109976363453906944| [323,0.20628238]|\n",
      "|1109976363453906944| [293,0.20628238]|\n",
      "|1109976363453906944| [336,0.19855197]|\n",
      "|1109976363453906944|   [164,0.104869]|\n",
      "|1109976363453906944|[207,0.104758695]|\n",
      "|1109976363453906944| [224,0.10435731]|\n",
      "|1109976363453906944| [210,0.09976264]|\n",
      "|1109976363453906944| [204,0.09689172]|\n",
      "|1109976363453906944| [184,0.09689172]|\n",
      "|1109976363453906944| [125,0.08567983]|\n",
      "|1109976363453906944|[149,0.081252605]|\n",
      "|1109976363453906944| [327,0.07774757]|\n",
      "|1109976363453906944| [341,0.07774757]|\n",
      "|1109976363453906944| [299,0.07774757]|\n",
      "|1109976363453906944| [305,0.07774757]|\n",
      "|1109976363453906944| [275,0.07774757]|\n",
      "+-------------------+-----------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "recall_res.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_article_index(row):\n",
    "    return row.user_id, row.als_article_id[0]\n",
    "\n",
    "recall_res = recall_res.rdd.map(get_article_index).toDF(['user_id', 'als_article_id'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+--------------+\n",
      "|            user_id|als_article_id|\n",
      "+-------------------+--------------+\n",
      "|1109976363453906944|           263|\n",
      "|1109976363453906944|           181|\n",
      "|1109976363453906944|           255|\n",
      "|1109976363453906944|           307|\n",
      "|1109976363453906944|           323|\n",
      "|1109976363453906944|           293|\n",
      "|1109976363453906944|           336|\n",
      "|1109976363453906944|           164|\n",
      "|1109976363453906944|           207|\n",
      "|1109976363453906944|           224|\n",
      "|1109976363453906944|           210|\n",
      "|1109976363453906944|           204|\n",
      "|1109976363453906944|           184|\n",
      "|1109976363453906944|           125|\n",
      "|1109976363453906944|           149|\n",
      "|1109976363453906944|           327|\n",
      "|1109976363453906944|           341|\n",
      "|1109976363453906944|           299|\n",
      "|1109976363453906944|           305|\n",
      "|1109976363453906944|           275|\n",
      "+-------------------+--------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "recall_res.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "recall_res = recall_res.join(article_real_index, on=['als_article_id'], how='left').select(['user_id', 'article_id'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+----------+\n",
      "|            user_id|article_id|\n",
      "+-------------------+----------+\n",
      "|1108264901190615040|     13890|\n",
      "|1114863751909081088|     13890|\n",
      "|1114865014205841408|     13890|\n",
      "|                 10|     13890|\n",
      "|                  5|     13890|\n",
      "|1109995683777085440|     13890|\n",
      "|1114864233264185344|     13890|\n",
      "|1115089292662669312|     13890|\n",
      "|1114864474352779264|     13890|\n",
      "|1114865875103514624|     13890|\n",
      "|1114863941936218112|     13890|\n",
      "|1113004557979353088|     13890|\n",
      "|1114863748553637888|     13890|\n",
      "|1103195673450250240|     13890|\n",
      "|1114865402044743680|     13890|\n",
      "|1114863998437687296|     13890|\n",
      "|                 33|     13890|\n",
      "|1114863735962337280|     13890|\n",
      "|1106473203766657024|     13890|\n",
      "|1114863902073552896|     13890|\n",
      "+-------------------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# - 4、推荐结果存储\n",
    "# 获取每个文章对应的频道，推荐给用户时按照频道存储\n",
    "recall_res.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "#找到每个文章对应的频道，然后按照频道分组\n",
    "ur.spark.sql('use article')\n",
    "\n",
    "article_data = ur.spark.sql(\"select article_id, channel_id from article_data\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "recall_channel = recall_res.join(article_data, on=['article_id'], how='left')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+-------------------+----------+\n",
      "|article_id|            user_id|channel_id|\n",
      "+----------+-------------------+----------+\n",
      "|     13401|1114094806092480512|        18|\n",
      "|     13401|1111524501104885760|        18|\n",
      "|     13401|1114866560301793280|        18|\n",
      "|     13401|1113316420155867136|        18|\n",
      "|     13401|1109984273839947776|        18|\n",
      "|     13401|1114865682668847104|        18|\n",
      "|     13401|1114863965080387584|        18|\n",
      "|     14805|1105045287866466304|        18|\n",
      "|     14805|1114863846486441984|        18|\n",
      "|     14805|1115535317173010432|        18|\n",
      "|     14805|1114864128259784704|        18|\n",
      "|     14805|1114871412419461120|        18|\n",
      "|     14805|1114863759672737792|        18|\n",
      "|     14805|                 10|        18|\n",
      "|     14805|                  5|        18|\n",
      "|     14805|1109995683777085440|        18|\n",
      "|     14805|1114864233264185344|        18|\n",
      "|     14805|1115089292662669312|        18|\n",
      "|     14805|1114864474352779264|        18|\n",
      "|     14805|1114865875103514624|        18|\n",
      "+----------+-------------------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "recall_channel.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [],
   "source": [
    "recall_channel = recall_channel.groupBy(['user_id', 'channel_id']).agg(F.collect_list('article_id')).withColumnRenamed('collect_list(article_id)', 'article_list')\n",
    "\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+----------+--------------------+\n",
      "|            user_id|channel_id|        article_list|\n",
      "+-------------------+----------+--------------------+\n",
      "|                 23|        18|[14805, 14839, 17...|\n",
      "|1109993249109442560|         7|            [141437]|\n",
      "|1113049054452908032|         7|            [141437]|\n",
      "|1113100263847100416|         5|            [141440]|\n",
      "|1114863751909081088|         7|            [141437]|\n",
      "|                 38|        13|    [141431, 141431]|\n",
      "|1114864233264185344|        13|    [141431, 141431]|\n",
      "|                 10|         5|            [141440]|\n",
      "|1106473203766657024|         7|            [141437]|\n",
      "|                 33|        18|[14805, 14839, 17...|\n",
      "|1106473203766657024|      null|[1112593324574769...|\n",
      "|                  1|         7|            [141437]|\n",
      "|1105093883106164736|        13|    [141431, 141431]|\n",
      "|1114866560301793280|         7|            [141469]|\n",
      "|1114864434305564672|         7|            [141437]|\n",
      "|1105093883106164736|         5|            [141440]|\n",
      "|1114863759672737792|         5|            [141440]|\n",
      "|1113100263847100416|         7|            [141437]|\n",
      "|1114863991156375552|        13|    [141431, 141431]|\n",
      "|1113049054452908032|      null|[1112593242529988...|\n",
      "+-------------------+----------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "recall_channel.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 基于内容的召回， 用户的点击行为\n",
    "\n",
    "ur.spark.sql('use profile')\n",
    "user_article_basic = ur.spark.sql(\"select * from user_article_basic\")\n",
    "user_article_basic = user_article_basic.filter('clicked=True')\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+-------------------+-------------------+----------+------+-------+---------+--------+---------+\n",
      "|            user_id|        action_time|         article_id|channel_id|shared|clicked|collected|exposure|read_time|\n",
      "+-------------------+-------------------+-------------------+----------+------+-------+---------+--------+---------+\n",
      "|1112727762809913344|2019-04-03 12:51:57|              18172|        18| false|   true|     true|    true|    19413|\n",
      "|                  1|2019-03-07 16:57:34|              44386|        18| false|   true|    false|    true|    17850|\n",
      "|1109976363453906944|2019-03-25 11:52:31|              13728|        18| false|   true|    false|    true|    14218|\n",
      "|1114864354622177280|2019-04-09 16:39:22|              17304|        18| false|   true|    false|    true|         |\n",
      "|                 23|2019-04-03 08:10:23|              44739|        18| false|   true|    false|    true|     7013|\n",
      "|                  1|2019-03-17 10:32:01|              17632|        18| false|   true|    false|    true|     1014|\n",
      "|1114863748553637888|2019-04-09 16:41:08|             141437|         7| false|   true|    false|    true|     2066|\n",
      "|1109994594201763840|2019-04-06 23:56:56|              15140|        18| false|   true|    false|    true|     1433|\n",
      "|1112715153402494976|2019-04-01 22:01:14|              17542|        18| false|   true|    false|    true|    20092|\n",
      "|1114863751909081088|2019-04-09 16:40:43|              15139|        18| false|   true|    false|    true|         |\n",
      "|                  2|2019-03-05 10:19:54|              44371|        18| false|   true|    false|    true|      938|\n",
      "|                 23|2019-04-02 15:06:37|1112593242529988608|         3| false|   true|    false|    true|     3366|\n",
      "|                  4|2019-04-04 14:28:19|1112525856586072064|         7| false|   true|     true|    true|    54151|\n",
      "|1114863751909081088|2019-04-07 20:13:33|1112608068731928576|         3| false|   true|    false|    true|     1711|\n",
      "|1114863941936218112|2019-04-07 20:24:40|              18795|        18| false|   true|    false|    true|    57949|\n",
      "|1114864354622177280|2019-04-09 16:41:05|              18156|        18| false|   true|    false|    true|     6901|\n",
      "|1114865014205841408|2019-04-09 16:42:47|             141437|         7| false|   true|    false|    true|     5091|\n",
      "|                  2|2019-03-07 10:06:20|              18103|        18| false|   true|    false|    true|      648|\n",
      "|                  1|2019-03-22 00:52:31|              18335|        18| false|   true|    false|    true|    19983|\n",
      "|                  1|2019-03-23 11:31:24|1108924834420621312|        18| false|   true|    false|    true|     3131|\n",
      "+-------------------+-------------------+-------------------+----------+------+-------+---------+--------+---------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "user_article_basic.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_clicked_article_similar(partition):\n",
    "    \"\"\"召回用户点击的文章当中相似的文章推荐\n",
    "    \"\"\"\n",
    "    import happybase\n",
    "    pool = happybase.ConnectionPool(size=10, host='hadoop-master')\n",
    "    \n",
    "    with pool.connection() as conn:\n",
    "        similar_table = conn.table('article_similar')\n",
    "        for row in partition:\n",
    "            # 获取相似文章结果表\n",
    "            similar_article = similar_table.row(str(row.article_id).encode(),\n",
    "                                                columns=[b'similar'])\n",
    "            # 相似文章相似度排序过滤，召回不需要太大的数据， 百个，千\n",
    "            _srt = sorted(similar_article.items(), key=lambda item: item[1], reverse=True)\n",
    "            if _srt:\n",
    "                # 每次行为推荐10篇文章\n",
    "                reco_article = [int(i[0].split(b':')[1]) for i in _srt][:10]\n",
    "            \n",
    "                # 过滤历史\n",
    "                \n",
    "                # 存储\n",
    "                 # 获取历史看过的该频道文章\n",
    "                history_table = conn.table('history_recall')\n",
    "                # 多个版本\n",
    "                data = history_table.cells('reco:his:{}'.format(row.user_id).encode(),\n",
    "                                           'channel:{}'.format(row.channel_id).encode())\n",
    "\n",
    "                history = []\n",
    "                if len(data) >= 2:\n",
    "                    for l in data[:-1]:\n",
    "                        history.extend(eval(l))\n",
    "                else:\n",
    "                    history = []\n",
    "\n",
    "                # 过滤reco_article与history\n",
    "                reco_res = list(set(reco_article) - set(history))\n",
    "\n",
    "                # 进行推荐，放入基于内容的召回表当中以及历史看过的文章表当中\n",
    "                if reco_res:\n",
    "                    # content_table = conn.table('cb_content_recall')\n",
    "                    content_table = conn.table('cb_recall')\n",
    "                    content_table.put(\"recall:user:{}\".format(row.user_id).encode(),\n",
    "                                      {'content:{}'.format(row.channel_id).encode(): str(reco_res).encode()})\n",
    "\n",
    "                    # 放入历史推荐过文章\n",
    "                    history_table.put(\"reco:his:{}\".format(row.user_id).encode(),\n",
    "                                      {'channel:{}'.format(row.channel_id).encode(): str(reco_res).encode()})\n",
    "\n",
    "\n",
    "user_article_basic.foreachPartition(get_clicked_article_similar)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
